home *** CD-ROM | disk | FTP | other *** search
- # -*- coding: utf-8 -*-
- __author__ = 'Robert Ancell <bob27@users.sourceforge.net>'
- __license__ = 'GNU General Public License Version 2'
- __copyright__ = 'Copyright 2005-2006 Robert Ancell'
-
- import os
- import sys
- import select
- import signal
- import xml.dom.minidom
- import xml.parsers.expat
-
- import game
- import cecp
- import uci
-
- from defaults import *
-
- CECP = 'CECP'
- UCI = 'UCI'
-
- class Option:
- """
- """
-
- value = ''
-
- class Level:
- """
- """
-
- def __init__(self):
- self.options = []
-
- class Profile:
- """
- """
-
- def __init__(self):
- self.name = ''
- self.protocol = ''
- self.path = ''
- self.executables = []
- self.arguments = []
- self.profiles = {}
-
- def detect(self):
- """
- """
- try:
- path = os.environ['PATH'].split(os.pathsep)
- except KeyError:
- path = []
-
- for directory in path:
- for executable in self.executables:
- b = directory + os.sep + executable
- if os.path.isfile(b):
- self.path = b
- return
- self.path = None
-
- def _getXMLText(node):
- """
- """
- if len(node.childNodes) == 0:
- return ''
- if len(node.childNodes) > 1 or node.childNodes[0].nodeType != node.TEXT_NODE:
- raise ValueError
- return node.childNodes[0].nodeValue
-
- def _loadLevel(node):
- """
- """
- level = Level()
- n = node.getElementsByTagName('name')
- if len(n) != 1:
- return None
- level.name = _getXMLText(n[0])
-
- for e in node.getElementsByTagName('option'):
- option = Option()
- option.value = _getXMLText(e)
- try:
- attribute = e.attributes['name']
- except KeyError:
- pass
- else:
- option.name = _getXMLText(attribute)
- level.options.append(option)
-
- return level
-
- def loadProfiles():
- """
- """
- profiles = []
-
- fileNames = [os.path.expanduser(LOCAL_AI_CONFIG), os.path.join(BASE_DIR, 'ai.xml'), 'ai.xml']
- document = None
- for f in fileNames:
- try:
- document = xml.dom.minidom.parse(f)
- except IOError:
- pass
- except xml.parsers.expat.ExpatError:
- print 'AI configuration from %s is invalid, ignoring' % f
- else:
- #print 'Loading AI configuration from %s' % f
- break
- if document is None:
- print 'WARNING: No AI configuration'
- return profiles
-
- elements = document.getElementsByTagName('aiconfig')
- if len(elements) == 0:
- return profiles
-
- for e in elements:
- for p in e.getElementsByTagName('ai'):
- try:
- protocolName = p.attributes['type'].nodeValue
- except KeyError:
- assert(False)
- if protocolName == 'cecp':
- protocol = CECP
- elif protocolName == 'uci':
- protocol = UCI
- else:
- assert(False), 'Uknown AI type: %s' % repr(protocolName)
-
- n = p.getElementsByTagName('name')
- assert(len(n) > 0)
- name = _getXMLText(n[0])
-
- executables = []
- n = p.getElementsByTagName('binary')
- assert(len(n) > 0)
- for x in n:
- executables.append(_getXMLText(x))
-
- arguments = []
- for x in p.getElementsByTagName('argument'):
- arguments.append(_getXMLText(x))
-
- levels = {}
- for x in p.getElementsByTagName('level'):
- level = _loadLevel(x)
- if level is not None:
- levels[level.name] = level
-
- profile = Profile()
- profile.name = name
- profile.protocol = protocol
- profile.executables = executables
- profile.arguments = arguments
- profile.levels = levels
- profiles.append(profile)
-
- return profiles
-
- class CECPConnection(cecp.Connection):
- """
- """
-
- def __init__(self, player):
- """
- """
- self.player = player
- cecp.Connection.__init__(self)
-
- def onOutgoingData(self, data):
- """Called by cecp.Connection"""
- self.player.logText(data, 'output')
- self.player.sendToEngine(data)
-
- def onMove(self, move):
- """Called by cecp.Connection"""
- if self.player.isReadyToMove():
- self.player.moving = True
- self.player.move(move)
- else:
- assert(self.player.suppliedMove is None)
- self.player.suppliedMove = move
-
- def onResign(self):
- """Called by cecp.Connection"""
- self.player.resign()
-
- def onDraw(self):
- """Called by cecp.Connection"""
- print self.player.claimDraw()
-
- def logText(self, text, style):
- """Called by cecp.Connection"""
- self.player.logText(text, style)
-
- class UCIConnection(uci.StateMachine):
- """
- """
-
- def __init__(self, player):
- """
- """
- self.player = player
- uci.StateMachine.__init__(self)
-
- def onOutgoingData(self, data):
- """Called by uci.StateMachine"""
- self.player.logText(data, 'output')
- self.player.sendToEngine(data)
-
- def logText(self, text, style):
- """Called by uci.StateMachine"""
- self.player.logText(text, style)
-
- def onMove(self, move):
- """Called by uci.StateMachine"""
- self.player.move(move)
-
- # Catch zombie processes
- def _cDied(sig, stackFrame):
- try:
- (pid, status) = os.waitpid(-1, os.WNOHANG)
- except OSError:
- pass
- signal.signal(signal.SIGCHLD, _cDied)
-
- class Player(game.ChessPlayer):
- """
- """
-
- def __init__(self, name, profile, level = 'normal'):
- """Constructor for an AI player.
-
- 'name' is the name of the player (string).
- 'profile' is the profile to use for the AI (Profile).
- 'level' is the difficulty level to use (string).
- """
- self.__profile = profile
- self.__level = level
-
- self.moving = False
- self.suppliedMove = None
-
- game.ChessPlayer.__init__(self, name)
-
- # Pipe to communicate to engine with
- (toManagerOutput, toManagerInput) = os.pipe()
- (fromManagerOutput, fromManagerInput) = os.pipe()
-
- # Store the file descripter for reading/writing
- self.__toEngineFd = toManagerInput
- self.__fromEngineFd = fromManagerOutput
-
- # Fork off a child process to manage the engine
- try:
- self.__pid = os.fork()
- except OSError, e:
- print 'Monitor failed to fork: %s' % e.message
- os.close(toManagerInput)
- os.close(toManagerOutput)
- os.close(fromManagerInput)
- os.close(fromManagerOutput)
- self.__toEngineFd = None
- self.__fromEngineFd = None
- else:
- if self.__pid == 0:
- os.close(toManagerInput)
- os.close(fromManagerOutput)
- self._runMonitor(fromManagerInput, toManagerOutput)
- os.close(toManagerOutput)
- os.close(fromManagerInput)
- os._exit(0)
-
- os.close(toManagerOutput)
- os.close(fromManagerInput)
-
- if profile.protocol == CECP:
- self.connection = CECPConnection(self)
- elif profile.protocol == UCI:
- self.connection = UCIConnection(self)
- else:
- assert(False)
-
- self.connection.start()
- self.connection.startGame()
- try:
- level = self.__profile.levels[self.__level]
- except KeyError:
- self.connection.configure()
- else:
- self.connection.configure(level.options)
-
- # Methods to extend
-
- def logText(self, text, style):
- """
- """
- pass
-
- # Public methods
-
- def getProfile(self):
- """Get the AI profile this AI is using.
-
- Returns a 2-tuple containing the profile name (str) and the difficulty level (str).
- """
- return (self.__profile.name, self.__level)
-
- def fileno(self):
- """Returns the file descriptor for communicating with the engine (integer)"""
- return self.__fromEngineFd
-
- def read(self):
- """Read and process data from the engine.
-
- This is non-blocking.
- """
- while True:
- # Connection was closed
- if self.__fromEngineFd == None:
- return False
-
- # Check if data is available
- (rlist, _, xlist) = select.select([self.__fromEngineFd], [], [self.__fromEngineFd], 0)
- if (len(rlist) + len(xlist)) == 0:
- return True
-
- # Read a chunk and process
- try:
- data = os.read(self.__fromEngineFd, 256)
- except OSError, e:
- print 'Error reading from chess engine: ' + str(e)
- self._die()
- return False
- if len(data) == 0:
- print 'Engine has died'
- self._die()
- return False
- self.connection.registerIncomingData(data)
-
- def sendToEngine(self, data):
- """
- """
- if self.__toEngineFd == None:
- return
-
- try:
- os.write(self.__toEngineFd, data)
- except OSError, e:
- print 'Failed to write to engine: ' + str(e)
-
- def quit(self):
- """Disconnect the AI"""
- fd = self.__toEngineFd
- self.__toEngineFd = None
- self.__fromEngineFd = None
-
- # Send quit
- try:
- if fd is not None:
- os.write(fd, '\nquit\n') # FIXME: CECP specific
- except OSError:
- return
-
- # Extended methods
-
- def onPlayerMoved(self, player, move):
- """Called by game.ChessPlayer"""
- if self.__toEngineFd == None:
- return
- isSelf = player is self and self.moving
- self.moving = False
- self.connection.reportMove(move.canMove, isSelf)
-
- def onUndoMove(self):
- """Called by game.ChessPlayer"""
- self.connection.undoMove()
-
- def readyToMove(self):
- """Called by game.ChessPlayer"""
- if self.__toEngineFd == None:
- self.die()
- return
-
- # AI Engines always claim draw due to three-fold-repetition and
- # 50 move rule
- if self.claimDraw():
- return
-
- game = self.getGame()
- whiteTime = game.getWhite().getRemainingTime()
- blackTime = game.getBlack().getRemainingTime()
- if game.getWhite() is self:
- ownTime = whiteTime
- else:
- ownTime = blackTime
-
- if self.suppliedMove is None:
- self.connection.requestMove(whiteTime, blackTime, ownTime)
- else:
- self.moving = True
- move = self.suppliedMove
- self.suppliedMove = None
- self.move(move)
-
- # AI Engines always claim draw due to three-fold-repetition and
- # 50 move rule
- self.claimDraw()
-
- def onGameEnded(self, game):
- """Called by game.ChessPlayer"""
- self.quit()
-
- def _die(self):
- self.quit()
- self.die()
-
- def _runEngine(self, toEngineFd, fromEngineFd):
- # Make the engine low priority for CPU usage
- os.nice(19)
-
- # Change directory so any log files are not in the users home directory
- try:
- os.mkdir(LOG_DIR)
- except OSError:
- pass
- try:
- os.chdir(LOG_DIR)
- except OSError:
- pass
-
- # Connect stdin, stdout and stderr to the manager process
- os.dup2(toEngineFd, sys.stdin.fileno())
- os.dup2(fromEngineFd, sys.stdout.fileno())
- os.dup2(fromEngineFd, sys.stderr.fileno())
-
- # Execute the engine
- try:
- os.execv(self.__profile.path, [self.__profile.path] + self.__profile.arguments)
- except OSError:
- pass
-
- def _runMonitor(self, toApplicationFd, fromApplicationFd):
- # Make pipes to the child process
- (toEngineOutput, toEngineInput) = os.pipe()
- (fromEngineOutput, fromEngineInput) = os.pipe()
-
- # Fork and execute the child
- try:
- enginePID = os.fork()
- except OSError, e:
- print 'Monitor failed to fork: %s' % e.message
- os._exit(1);
- if enginePID == 0:
- os.close(toApplicationFd)
- os.close(fromApplicationFd)
- os.close(toEngineInput)
- os.close(fromEngineOutput)
- self._runEngine(toEngineOutput, fromEngineInput)
- os._exit(0)
- else:
- os.close(toEngineOutput)
- os.close(fromEngineInput)
-
- # Forward data between the application and the child process and wait for closed pipes
- inputPipes = (fromApplicationFd, fromEngineOutput)
- targets = {fromApplicationFd: toEngineInput,
- fromEngineOutput: toApplicationFd}
- pipes = (toApplicationFd, fromApplicationFd,
- toEngineInput, fromEngineOutput)
-
- try:
- while True:
- # Wait for data
- (rfds, _, xfds) = select.select(inputPipes, [], pipes, None)
-
- for fd in rfds:
- data = os.read(fd, 65535)
- if len(data) == 0:
- raise OSError('End of data')
-
- # Bridge information between the application and the engine
- os.write(targets[fd], data)
- except:
- # Kill the child and exit
- try:
- os.kill(enginePID, signal.SIGQUIT)
- except OSError:
- pass
- os._exit(0)
-